/* Copyright (C) 2014 InfiniDB, Inc.

   This program is free software; you can redistribute it and/or
   modify it under the terms of the GNU General Public License
   as published by the Free Software Foundation; version 2 of
   the License.

   This program is distributed in the hope that it will be useful,
   but WITHOUT ANY WARRANTY; without even the implied warranty of
   MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
   GNU General Public License for more details.

   You should have received a copy of the GNU General Public License
   along with this program; if not, write to the Free Software
   Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
   MA 02110-1301, USA. */

/***********************************************************************
 *   $Id: functioncolumn.cpp 9679 2013-07-11 22:32:03Z zzhu $
 *
 *
 ***********************************************************************/
#include <string>
#include <iostream>
#include <sstream>
using namespace std;

#include <boost/tokenizer.hpp>
#include <boost/algorithm/string.hpp>
using namespace boost;

#include "functor_json.h"
#include "bytestream.h"
#include "functioncolumn.h"
#include "constantcolumn.h"
#include "arithmeticcolumn.h"
#include "simplecolumn.h"
#include "objectreader.h"
#include "calpontselectexecutionplan.h"
#include "simplefilter.h"
#include "aggregatecolumn.h"
#include "windowfunctioncolumn.h"

#include "funcexp.h"
#include "functor_export.h"
#include "functor_str.h"
using namespace funcexp;


namespace execplan
{
/**
 * Constructors/Destructors
 */
FunctionColumn::FunctionColumn()
{
}

FunctionColumn::FunctionColumn(string& funcName) : fFunctionName(funcName)
{
}

FunctionColumn::FunctionColumn(const string& functionName, const string& funcParmsInString,
                               const uint32_t sessionID)
 : ReturnedColumn(sessionID)
 , fFunctionName(functionName)
 , fData(functionName + "(" + funcParmsInString + ")")
 , fFunctor(0)
{
  funcParms(funcParmsInString);
}

FunctionColumn::FunctionColumn(const FunctionColumn& rhs, const uint32_t sessionID)
 : ReturnedColumn(rhs, sessionID)
 , fFunctionName(rhs.functionName())
 , fTableAlias(rhs.tableAlias())
 , fData(rhs.data())
 , fTimeZone(rhs.timeZone())
 , fFunctor(rhs.fFunctor)
{
  fFunctionParms.clear();
  fSimpleColumnList.clear();
  fAggColumnList.clear();
  fWindowFunctionColumnList.clear();

  SPTP pt;

  for (uint32_t i = 0; i < rhs.fFunctionParms.size(); i++)
  {
    pt.reset(new ParseTree(*(rhs.fFunctionParms[i])));
    fFunctionParms.push_back(pt);
    pt->walk(getSimpleCols, &fSimpleColumnList);
    pt->walk(getAggCols, &fAggColumnList);
    pt->walk(getWindowFunctionCols, &fWindowFunctionColumnList);
  }

  fAlias = rhs.alias();
}

FunctionColumn::~FunctionColumn()
{
  if (fDynamicFunctor)
    delete fDynamicFunctor;
}

/**
 * Methods
 */
ostream& operator<<(ostream& output, const FunctionColumn& rhs)
{
  output << rhs.toString();

  return output;
}

const string FunctionColumn::toString() const
{
  ostringstream output;
  output << std::endl << "FunctionColumn: " << fFunctionName << endl;

  if (fAlias.length() > 0)
    output << "/Alias: " << fAlias;

  output << "expressionId=" << fExpressionId << endl;
  output << "joinInfo=" << fJoinInfo << " returnAll=" << fReturnAll << " sequence#=" << fSequence << endl;
  output << "resultType=" << colDataTypeToString(fResultType.colDataType) << "|" << fResultType.colWidth
         << endl;
  output << "operationType=" << colDataTypeToString(fOperationType.colDataType) << endl;
  output << "function parm: " << endl;

  for (uint32_t i = 0; i < fFunctionParms.size(); i++)
    output << fFunctionParms[i]->data()->toString() << endl;

  return output.str();
}

string FunctionColumn::toCppCode(IncludeSet& includes) const
{
  includes.insert("functioncolumn.h");
  stringstream ss;

  auto fFuncParmsInString = fData.substr(fFunctionName.size() + 1, fData.size() - fFunctionName.size() - 2);

  ss << "FunctionColumn(" << std::quoted(fFunctionName) << ", " << std::quoted(fFuncParmsInString) << ", " << sessionID() << ")";

  return ss.str();
}

const string FunctionColumn::data() const
{
  return fData;
}

void FunctionColumn::funcParms(const string& funcParmsInString)
{
  // special process to interval function. make the passed in string one literal constant
  if (fFunctionName.compare("interval") == 0)
  {
    SPTP sptp(new ParseTree(new ConstantColumn(funcParmsInString, ConstantColumn::LITERAL)));
    fFunctionParms.push_back(sptp);
    return;
  }

  // try to replace the comma and space in '' to avoid string conflict
  // @bug #396 for space delimited function arguments.
  string funcParam = funcParmsInString;
  string::size_type pos1 = 0;
  string::size_type pos2 = 0;
  string::size_type pos3 = 0;

  while (pos2 < funcParam.length() && pos1 != string::npos)
  {
    pos1 = funcParam.find_first_of("'", pos2);
    pos2 = funcParam.find_first_of("'", pos1 + 1);
    pos3 = funcParam.find_first_of(", ", pos1);

    if (pos3 < pos2)
    {
      if (funcParam[pos3] == ',')
        funcParam.replace(pos3, 1, "^");
      else if (funcParam[pos3] == ' ')
        funcParam[pos3] = 0x1f;  // special char to replace space in quotes
    }

    pos2++;
  }

  // also replace the comma and space in () to avoid function conflict
  unsigned int par = 0;

  for (unsigned int i = 0; i < funcParam.length(); i++)
  {
    if (funcParam[i] == '(')
      par++;

    if (funcParam[i] == ')')
      par--;

    if (funcParam[i] == ',' && par != 0)
      funcParam.replace(i, 1, "^");

    if (funcParam[i] == ' ' && par != 0)
      funcParam[i] = 0x1f;
  }

  typedef boost::tokenizer<boost::char_separator<char> > tokenizer;
  boost::char_separator<char> sep(", ");
  tokenizer tokens(funcParam, sep);

  for (tokenizer::iterator tok_iter = tokens.begin(); tok_iter != tokens.end(); ++tok_iter)
  {
    string tok = (*tok_iter);
    // remove the to_upper line. don't know why have it in the first place. for bug #320
    // to_upper(tok);

    // take off the trailing and ending spaces
    std::string::size_type first = tok.find_first_not_of(" ");
    tok = tok.substr(first, tok.find_last_not_of(" ") - first + 1);

    // recover '^' to ',' and recover 0x1f to ' '
    for (unsigned int i = 0; i < tok.length(); i++)
    {
      if (tok[i] == '^')
        tok[i] = ',';

      if (tok[i] == 0x1f)
        tok[i] = ' ';
    }

    // parse argument
    if (tok[0] == '\'' && tok[tok.length() - 1] == '\'')
    {
      // take off quotes
      first = tok.find_first_not_of("'");
      tok = tok.substr(first, tok.find_last_not_of("'") - first + 1);
      SPTP cc(new ParseTree(new ConstantColumn(tok, ConstantColumn::LITERAL)));
      fFunctionParms.push_back(cc);
    }
    else if (tok.find("+", 0) != string::npos || tok.find("-", 0) != string::npos ||
             tok.find("*", 0) != string::npos || tok.find("/", 0) != string::npos ||
             tok.find("^", 0) != string::npos || tok.find("(", 0) != string::npos ||
             tok.find(")", 0) != string::npos)
    {
      SPTP ac(new ParseTree(new ArithmeticColumn(tok)));
      fFunctionParms.push_back(ac);
    }
    else if (isdigit(tok[0]) || tok[0] == '.')
    {
      SPTP cc(new ParseTree(new ConstantColumn(tok, ConstantColumn::NUM)));
      fFunctionParms.push_back(cc);
    }
    else if (strcasecmp(tok.c_str(), "NULL") == 0)
    {
      SPTP cc(new ParseTree(new ConstantColumn(tok, ConstantColumn::NULLDATA)));
      fFunctionParms.push_back(cc);
    }
    // map keyword to constant column
    else if (strcasecmp(tok.c_str(), "FROM") == 0 || strcasecmp(tok.c_str(), "TO") == 0)
    {
      SPTP cc(new ParseTree(new ConstantColumn(tok, ConstantColumn::LITERAL)));
      fFunctionParms.push_back(cc);
    }
    // simplecolumn
    else
    {
      SPTP sc(new ParseTree(new SimpleColumn(tok, fSessionID)));
      fFunctionParms.push_back(sc);
    }
  }
}

void FunctionColumn::serialize(messageqcpp::ByteStream& b) const
{
  b << (ObjectReader::id_t)ObjectReader::FUNCTIONCOLUMN;
  ReturnedColumn::serialize(b);
  b << fFunctionName;

  b << static_cast<uint32_t>(fFunctionParms.size());

  for (uint32_t i = 0; i < fFunctionParms.size(); i++)
    ObjectReader::writeParseTree(fFunctionParms[i].get(), b);

  b << fTableAlias;
  b << fData;
  messageqcpp::ByteStream::octbyte timeZone = fTimeZone;
  b << timeZone;
}

void FunctionColumn::unserialize(messageqcpp::ByteStream& b)
{
  uint32_t size, i;
  // SRCP rc;
  SPTP pt;

  fFunctionParms.erase(fFunctionParms.begin(), fFunctionParms.end());
  fSimpleColumnList.clear();
  fAggColumnList.clear();
  fWindowFunctionColumnList.clear();

  ObjectReader::checkType(b, ObjectReader::FUNCTIONCOLUMN);
  ReturnedColumn::unserialize(b);
  b >> fFunctionName;

  b >> size;

  for (i = 0; i < size; i++)
  {
    pt.reset(ObjectReader::createParseTree(b));
    fFunctionParms.push_back(pt);
    pt->walk(getSimpleCols, &fSimpleColumnList);
    pt->walk(getAggCols, &fAggColumnList);
    pt->walk(getWindowFunctionCols, &fWindowFunctionColumnList);
  }

  b >> fTableAlias;
  b >> fData;
  messageqcpp::ByteStream::octbyte timeZone;
  b >> timeZone;
  fTimeZone = timeZone;
  FuncExp* funcExp = FuncExp::instance();
  fFunctor = funcExp->getFunctor(fFunctionName);
  fFunctor->fix(*this);

  // @bug 3506. Special treatment for rand() function. reset the seed
  Func_rand* rand = dynamic_cast<Func_rand*>(fFunctor);
  if (rand)
    fFunctor = fDynamicFunctor = new Func_rand();

  Func_encode* encode = dynamic_cast<Func_encode*>(fFunctor);
  if (encode)
    fFunctor = fDynamicFunctor = new Func_encode();

  Func_decode* decode = dynamic_cast<Func_decode*>(fFunctor);
  if (decode)
    fFunctor = fDynamicFunctor = new Func_decode();

  // Special treatment for json function contains the variable path. reset the variable path
  if (dynamic_cast<Func_json_length*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_length();

  if (dynamic_cast<Func_json_keys*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_keys();

  if (dynamic_cast<Func_json_exists*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_exists();

  if (dynamic_cast<Func_json_value*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_value();

  if (dynamic_cast<Func_json_query*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_query();

  if (dynamic_cast<Func_json_contains*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_contains();

  if (dynamic_cast<Func_json_array_append*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_array_append();

  if (dynamic_cast<Func_json_array_insert*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_array_insert();

  if (auto f = dynamic_cast<Func_json_insert*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_insert(f->getMode());

  if (dynamic_cast<Func_json_remove*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_remove();

  if (dynamic_cast<Func_json_contains_path*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_contains_path();

  if (dynamic_cast<Func_json_search*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_search();

  if (dynamic_cast<Func_json_extract*>(fFunctor))
    fFunctor = fDynamicFunctor = new Func_json_extract();
}

bool FunctionColumn::operator==(const FunctionColumn& t) const
{
  // this is not being used. compilation error.

  const ReturnedColumn *rc1, *rc2;
  FunctionParm::const_iterator it, it2;

  rc1 = static_cast<const ReturnedColumn*>(this);
  rc2 = static_cast<const ReturnedColumn*>(&t);

  if (*rc1 != *rc2)
    return false;

  if (fFunctionName != t.fFunctionName)
    return false;

  if (fFunctionParms.size() != t.fFunctionParms.size())
    return false;

  for (it = fFunctionParms.begin(), it2 = t.fFunctionParms.begin(); it != fFunctionParms.end(); ++it, ++it2)
    if (**it != **it2)
      return false;

  if (fTableAlias != t.fTableAlias)
    return false;

  if (fData != t.fData)
    return false;

  if (fTimeZone != t.fTimeZone)
    return false;

  return true;
}

bool FunctionColumn::operator==(const TreeNode* t) const
{
  const FunctionColumn* o;

  o = dynamic_cast<const FunctionColumn*>(t);

  if (o == NULL)
    return false;

  return *this == *o;
}

bool FunctionColumn::operator!=(const FunctionColumn& t) const
{
  return (!(*this == t));
}

bool FunctionColumn::operator!=(const TreeNode* t) const
{
  return (!(*this == t));
}

bool FunctionColumn::hasAggregate()
{
  if (fHasAggregate)
    return true;

  fAggColumnList.clear();

  for (uint32_t i = 0; i < fFunctionParms.size(); i++)
    fFunctionParms[i]->walk(getAggCols, &fAggColumnList);

  if (!fAggColumnList.empty())
    fHasAggregate = true;

  return fHasAggregate;
}

bool FunctionColumn::hasWindowFunc()
{
  fWindowFunctionColumnList.clear();

  for (uint32_t i = 0; i < fFunctionParms.size(); i++)
    fFunctionParms[i]->walk(getWindowFunctionCols, &fWindowFunctionColumnList);

  if (fWindowFunctionColumnList.empty())
    return false;

  return true;
}

void FunctionColumn::setDerivedTable()
{
  if (hasAggregate())
  {
    fDerivedTable = "";
    return;
  }

  setSimpleColumnList();
  string derivedTableAlias = "";

  for (uint32_t i = 0; i < fSimpleColumnList.size(); i++)
  {
    SimpleColumn* sc = fSimpleColumnList[i];
    sc->setDerivedTable();

    if (sc->derivedTable() != derivedTableAlias)
    {
      if (derivedTableAlias == "")
      {
        derivedTableAlias = sc->tableName();
      }
      else
      {
        derivedTableAlias = "";
        break;
      }
    }
    // MCOL-3239 Block for func column with both
    // derived table column and normal table column.
    else if (derivedTableAlias == "")
    {
      if (sc->tableAlias().length())
      {
        derivedTableAlias = "";
        break;
      }
    }
  }

  fDerivedTable = derivedTableAlias;
}

void FunctionColumn::replaceRealCol(CalpontSelectExecutionPlan::ReturnedColumnList& derivedColList)
{
  for (uint i = 0; i < fFunctionParms.size(); i++)
  {
    ParseTree* pt = fFunctionParms[i].get();
    replaceRefCol(pt, derivedColList);
  }
}

void FunctionColumn::setSimpleColumnList()
{
  fSimpleColumnList.clear();

  for (uint i = 0; i < fFunctionParms.size(); i++)
    fFunctionParms[i]->walk(getSimpleCols, &fSimpleColumnList);
}

bool FunctionColumn::singleTable(CalpontSystemCatalog::TableAliasName& tan)
{
  tan.clear();
  setSimpleColumnList();

  for (uint i = 0; i < fSimpleColumnList.size(); i++)
  {
    CalpontSystemCatalog::TableAliasName stan(
        fSimpleColumnList[i]->schemaName(), fSimpleColumnList[i]->tableName(),
        fSimpleColumnList[i]->tableAlias(), fSimpleColumnList[i]->viewName());

    if (tan.table.empty())
      tan = stan;
    else if (stan != tan)
      return false;
  }

  return true;
}

}  // namespace execplan
